/*
 * Copyright (c) 2021 Shenzhen Kaihong Digital Industry Development Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.view.tempcontrol.slice;

import cn.hutool.json.JSONUtil;
import com.view.tempcontrol.ResourceTable;
import com.view.tempcontrol.bean.PropertiesBean;
import com.view.tempcontrol.bean.ServicesBean;
import com.view.tempcontrol.bean.TempDataBean;
import com.view.tempcontrol.utils.ToolUtils;
import ohos.aafwk.ability.AbilitySlice;
import ohos.aafwk.content.Intent;
import ohos.agp.components.Text;
import ohos.app.dispatcher.task.TaskPriority;
import ohos.hiviewdfx.HiLog;
import ohos.hiviewdfx.HiLogLabel;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.TransportSupport;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class DetailAbilitySlice extends AbilitySlice {

    private static final HiLogLabel LABEL_LOG = new HiLogLabel(HiLog.LOG_APP, 0x11101, "MainAbilitySlice");

    /**
     * 异步线程池，参数可以根据业务特点作调整，也可以用其他异步方式来处理。
     */
    private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60,
            TimeUnit.SECONDS, new LinkedBlockingQueue<>(5000));

    /**
     * 连接凭证接入键值
     */
    private static String accessKey = "********";

    /**
     * 连接凭证接入码
     */
    private static String pw = "********************************";

    /**
     * 队列名，可以使用默认队列DefaultQueue
     */
    private static String queueName = "temp1234";
//    private static String queueName = "Park1234";

    /**
     * 按照qpid-jms的规范，组装连接URL。
     */
    private static String connectionUrl = "amqps://a16103bd0e.iot-amqps.cn-north-4.myhuaweicloud.com:5671?amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN";

//    private static String connectionUrl = "amqps://${UUCID}.iot-amqps.cn-north-4.myhuaweicloud.com:5671?amqp.vhost=default&amqp.idleTimeout=8000&amqp.saslMechanisms=PLAIN";

    private static Connection connection;
    private static Session session;
    private static MessageConsumer consumer;
    private static MessageProducer producer;
//    private static long lastReconnectTime = 0;
    private static AtomicBoolean isReconnecting = new AtomicBoolean(false);


    //页面组件
    String mEventTime = null;
    Text mDisplayTemp = null;
    Text mTempHigh = null;
    Text mTempLow = null;

    @Override
    protected void onStart(Intent intent) {
        super.onStart(intent);
        super.setUIContent(ResourceTable.Layout_ability_detail);

        String user = (String) intent.getParams().getParam("user");
        HiLog.info(LABEL_LOG, "user = " + user);

        initUI();

        getGlobalTaskDispatcher(TaskPriority.DEFAULT).asyncDispatch(new Runnable() {
            @Override
            public void run() {
                amqpConnect();
            }
        });
    }

    @Override
    protected void onStop() {
        super.onStop();
        shutdown();
    }

    void initUI() {
        mDisplayTemp = (Text) findComponentById(ResourceTable.Id_display_temp);
        mTempHigh = (Text) findComponentById(ResourceTable.Id_temp_high_warn);
        mTempLow = (Text) findComponentById(ResourceTable.Id_temp_low_warn);
    }

    void amqpConnect() {
        HiLog.info(LABEL_LOG, "amqpTest");
        try {
            createConsumer();
        } catch (Exception e) {
            //异常情况，重新连接
            reconnect();
        }

        // 2, 添加监听，参照consumer.setMessageListener(messageListener), 服务端主动推数据给客户端，但得考虑接受的数据速率是客户能力能够承受住的
        try {
            consumer.setMessageListener(messageListener);
        } catch (JMSException e) {
            HiLog.error(LABEL_LOG, "amqpTest setMessageListener failed!");
        }
    }

    private void createConsumer() throws Exception {
        HiLog.info(LABEL_LOG, "createConsumer");
        long timeStamp = System.currentTimeMillis();
        //UserName组装方法，请参见文档：AMQP客户端接入说明。
        String userName = "accessKey=" + accessKey + "|timestamp=" + timeStamp + "|";
        HiLog.info(LABEL_LOG, "userName = " + userName);
        HiLog.info(LABEL_LOG, "password = " + pw);
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.HwConnectionURL", connectionUrl);
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        JmsConnectionFactory cf = (JmsConnectionFactory) context.lookup("HwConnectionURL");

        HiLog.info(LABEL_LOG, "createConsumer 222");
        //信任服务端
        TransportOptions to = new TransportOptions();
        to.setTrustAll(true);
        cf.setSslContext(TransportSupport.createJdkSslContext(to));

        HiLog.info(LABEL_LOG, "createConsumer 333");


        // 创建连接
        Connection connection = cf.createConnection(userName, pw);


        HiLog.info(LABEL_LOG, "createConsumer aaa");

        ((JmsConnection) connection).
                addConnectionListener(myJmsConnectionListener);

        HiLog.info(LABEL_LOG, "createConsumer 444");


        connection.setExceptionListener(exceptionListener);
        HiLog.info(LABEL_LOG, "createConsumer 555");
        connection.start();
        HiLog.info(LABEL_LOG, "createConsumer 666");

        // 创建 Session
        // Session.CLIENT_ACKNOWLEDGE: 收到消息后，需要手动调用message.acknowledge()。
        // Session.AUTO_ACKNOWLEDGE: SDK自动ACK（推荐）。
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        HiLog.info(LABEL_LOG, "createConsumer 777");

        // 创建 consumer
        consumer = session.createConsumer(new JmsQueue(queueName));


        HiLog.info(LABEL_LOG, "createConsumer 888");
    }

    /**
     * 客户端断开需要重连
     */
    private void reconnect() {
        if (isReconnecting.compareAndSet(false, true)) {
            while (true) {
                try {
                    // 防止重连次数太多，重连时间间隔15s
//                    if (System.currentTimeMillis() - lastReconnectTime < 15 * 1000L) {
//                        Thread.sleep(15 * 1000L);
//                    }
                    Thread.sleep(15 * 1000L);
                    shutdown();
                    createConsumer();
//                    lastReconnectTime = System.currentTimeMillis();
                    isReconnecting.set(false);
                    break;
                } catch (Exception e) {
//                    lastReconnectTime = System.currentTimeMillis();
                    System.out.println("reconnect hand an exception: " + e.getMessage());
                }
            }
        }

    }

    /**
     * 在这里处理您收到消息后的具体业务逻辑。
     */
    private void processMessage(Message message) {
        if (null == message) {
            HiLog.info(LABEL_LOG, "processMessage message is null");
            return;
        }

        int temp = 0;
        List<ServicesBean> list = null;

        try {
            //接收到的数据
            String content = message.getBody(String.class);

            System.out.println("receive an message, the content is " + content);

            TempDataBean bean = JSONUtil.toBean(content, TempDataBean.class);

            if (null != bean.getNotify_data() && null != bean.getNotify_data().getBody()) {
                list = bean.getNotify_data().getBody().getServices();

                if (null != list && list.size() > 0) {
                    String dateStr = list.get(0).getEvent_time();
                    // 时间上报时间"2020-01-23 12:23:56"
                    mEventTime = ToolUtils.getDateStr(dateStr);
                    PropertiesBean propertiesBean = list.get(0).getProperties();

                    temp = propertiesBean.getTempday0();
                    HiLog.error(LABEL_LOG, "processMessage message temp = " + String.valueOf(temp));
                    HiLog.error(LABEL_LOG, "processMessage message mEventTime = " + mEventTime);
                }
            } else {
                HiLog.error(LABEL_LOG, "processMessage message is exception!");
            }

            //处理数据结束后，需要更新UI
            updateUI(temp);


        } catch (Exception e) {
            System.out.println("processMessage occurs error: " + e.getMessage());
        }
    }

    private void updateUI(int temp) {
        getUITaskDispatcher().asyncDispatch(new Runnable() {
            @Override
            public void run() {
                //UI更新操作在此
                if (temp >= 30) {
                    //高温报警
                    if (mEventTime!=null) {
                        mTempHigh.setText(mEventTime.substring(5, 16));
                    }

                } else if (temp <= 5) {
                    if (mEventTime != null) {
                        mTempLow.setText(mEventTime.substring(5, 16));
                    }
                }
                mDisplayTemp.setText(temp + "℃");
            }
        });
    }

    private static void shutdown() {
        HiLog.info(LABEL_LOG, "shutdown");
        if (consumer != null) {
            try {
                consumer.close();
            } catch (JMSException e) {
                HiLog.error(LABEL_LOG, "consumer close failed!");
            }
        }
        if (session != null) {
            try {
                session.close();
            } catch (JMSException e) {
                HiLog.error(LABEL_LOG, "session close failed!");
            }
        }
        if (connection != null) {
            try {
                connection.setExceptionListener(null);
                connection.close();
            } catch (JMSException e) {
                HiLog.error(LABEL_LOG, "connection close failed!");
            }
        }
    }

    private MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                HiLog.info(LABEL_LOG, "onMessage message = " + message.getBody(String.class));
                // 建议异步处理收到的消息，确保onMessage函数里没有耗时逻辑。
                // 如果业务处理耗时过程过长阻塞住线程，可能会影响SDK收到消息后的正常回调。
                executorService.submit(() -> processMessage(message));
            } catch (Exception e) {
                System.out.println("submit task occurs exception: " + e.getMessage());
            }
        }
    };

    private ExceptionListener exceptionListener = new ExceptionListener() {
        @Override
        public void onException(JMSException e) {
            System.out.println("connection has an exception:" + e);
            // 链接发生异常，需要重连
            reconnect();
        }
    };

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * 连接成功建立。
         */
        @Override
        public void onConnectionEstablished(URI remoteURI) {
            HiLog.error(LABEL_LOG, "onConnectionEstablished, remoteUri:" + remoteURI);
            System.out.println("onConnectionEstablished, remoteUri:" + remoteURI);
        }

        /**
         * 尝试过最大重试次数之后，最终连接失败。
         */
        @Override
        public void onConnectionFailure(Throwable error) {
            HiLog.error(LABEL_LOG, "onConnectionFailure, " + error.getMessage());
            System.out.println("onConnectionFailure, " + error.getMessage());
        }

        /**
         * 连接中断。
         */
        @Override
        public void onConnectionInterrupted(URI remoteURI) {
            HiLog.error(LABEL_LOG, "onConnectionInterrupted, remoteUri:" + remoteURI);
            System.out.println("onConnectionInterrupted, remoteUri:" + remoteURI);
        }

        /**
         * 连接中断后又自动重连上。
         */
        @Override
        public void onConnectionRestored(URI remoteURI) {
            HiLog.error(LABEL_LOG, "onConnectionRestored, remoteUri:" + remoteURI);
            System.out.println("onConnectionRestored, remoteUri:" + remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {
            System.out.println("onInboundMessage, " + envelope);
        }

        @Override
        public void onSessionClosed(Session session, Throwable cause) {
            System.out.println("onSessionClosed, session=" + session + ", cause =" + cause);
        }

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
            System.out.println("MessageConsumer, consumer=" + consumer + ", cause =" + cause);
        }

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {
            System.out.println("MessageProducer, producer=" + producer + ", cause =" + cause);
        }
    };

}
